# frozen_string_literal: true
begin
  require "openssl"
rescue LoadError
end

require "test/unit"
require "core_assertions"
require "tempfile"
require "socket"

if defined?(OpenSSL)

module OpenSSL::TestUtils
  module Fixtures
    module_function

    def pkey(name)
      OpenSSL::PKey.read(read_file("pkey", name))
    end

    def read_file(category, name)
      @file_cache ||= {}
      @file_cache[[category, name]] ||=
        File.read(File.join(__dir__, "fixtures", category, name + ".pem"))
    end

    def file_path(category, name)
      File.join(__dir__, "fixtures", category, name)
    end
  end

  module_function

  def generate_cert(dn, key, serial, issuer,
                    not_before: nil, not_after: nil)
    cert = OpenSSL::X509::Certificate.new
    issuer = cert unless issuer
    cert.version = 2
    cert.serial = serial
    cert.subject = dn
    cert.issuer = issuer.subject
    cert.public_key = key
    now = Time.now
    cert.not_before = not_before || now - 3600
    cert.not_after = not_after || now + 3600
    cert
  end


  def issue_cert(dn, key, serial, extensions, issuer, issuer_key,
                 not_before: nil, not_after: nil, digest: "sha256")
    cert = generate_cert(dn, key, serial, issuer,
                         not_before: not_before, not_after: not_after)
    issuer = cert unless issuer
    issuer_key = key unless issuer_key
    ef = OpenSSL::X509::ExtensionFactory.new
    ef.subject_certificate = cert
    ef.issuer_certificate = issuer
    extensions.each{|oid, value, critical|
      cert.add_extension(ef.create_extension(oid, value, critical))
    }
    cert.sign(issuer_key, digest)
    cert
  end

  def issue_crl(revoke_info, serial, lastup, nextup, extensions,
                issuer, issuer_key, digest)
    crl = OpenSSL::X509::CRL.new
    crl.issuer = issuer.subject
    crl.version = 1
    crl.last_update = lastup
    crl.next_update = nextup
    revoke_info.each{|rserial, time, reason_code|
      revoked = OpenSSL::X509::Revoked.new
      revoked.serial = rserial
      revoked.time = time
      enum = OpenSSL::ASN1::Enumerated(reason_code)
      ext = OpenSSL::X509::Extension.new("CRLReason", enum)
      revoked.add_extension(ext)
      crl.add_revoked(revoked)
    }
    ef = OpenSSL::X509::ExtensionFactory.new
    ef.issuer_certificate = issuer
    ef.crl = crl
    crlnum = OpenSSL::ASN1::Integer(serial)
    crl.add_extension(OpenSSL::X509::Extension.new("crlNumber", crlnum))
    extensions.each{|oid, value, critical|
      crl.add_extension(ef.create_extension(oid, value, critical))
    }
    crl.sign(issuer_key, digest)
    crl
  end

  def get_subject_key_id(cert, hex: true)
    asn1_cert = OpenSSL::ASN1.decode(cert)
    tbscert   = asn1_cert.value[0]
    pkinfo    = tbscert.value[6]
    publickey = pkinfo.value[1]
    pkvalue   = publickey.value
    digest = OpenSSL::Digest.digest('SHA1', pkvalue)
    if hex
      digest.unpack("H2"*20).join(":").upcase
    else
      digest
    end
  end

  def openssl?(major = nil, minor = nil, fix = nil, patch = 0, status = 0)
    return false if OpenSSL::OPENSSL_VERSION.include?("LibreSSL")
    return true unless major
    OpenSSL::OPENSSL_VERSION_NUMBER >=
      major * 0x10000000 + minor * 0x100000 + fix * 0x1000 + patch * 0x10 +
      status * 0x1
  end

  def libressl?(major = nil, minor = nil, fix = nil)
    version = OpenSSL::OPENSSL_VERSION.scan(/LibreSSL (\d+)\.(\d+)\.(\d+).*/)[0]
    return false unless version
    !major || (version.map(&:to_i) <=> [major, minor, fix]) >= 0
  end
end

class OpenSSL::TestCase < Test::Unit::TestCase
  include OpenSSL::TestUtils
  extend OpenSSL::TestUtils
  include Test::Unit::CoreAssertions

  def setup
    if ENV["OSSL_GC_STRESS"] == "1"
      GC.stress = true
    end
  end

  def teardown
    if ENV["OSSL_GC_STRESS"] == "1"
      GC.stress = false
    end
    # OpenSSL error stack must be empty
    assert_equal([], OpenSSL.errors)
  end

  # Omit the tests in FIPS.
  #
  # For example, the password based encryption used in the PEM format uses MD5
  # for deriving the encryption key from the password, and MD5 is not
  # FIPS-approved.
  #
  # See https://github.com/openssl/openssl/discussions/21830#discussioncomment-6865636
  # for details.
  def omit_on_fips
    return unless OpenSSL.fips_mode

    omit <<~MESSAGE
      Only for OpenSSL non-FIPS with the following possible reasons:
      * A testing logic is non-FIPS specific.
      * An encryption used in the test is not FIPS-approved.
    MESSAGE
  end

  def omit_on_non_fips
    return if OpenSSL.fips_mode

    omit "Only for OpenSSL FIPS"
  end
end

class OpenSSL::SSLTestCase < OpenSSL::TestCase
  RUBY = EnvUtil.rubybin
  ITERATIONS = ($0 == __FILE__) ? 100 : 10

  def setup
    super
    @ca_key  = Fixtures.pkey("rsa-1")
    @svr_key = Fixtures.pkey("rsa-2")
    @cli_key = Fixtures.pkey("rsa-3")
    @ca  = OpenSSL::X509::Name.parse("/DC=org/DC=ruby-lang/CN=CA")
    @svr = OpenSSL::X509::Name.parse("/DC=org/DC=ruby-lang/CN=localhost")
    @cli = OpenSSL::X509::Name.parse("/DC=org/DC=ruby-lang/CN=localhost")
    ca_exts = [
      ["basicConstraints","CA:TRUE",true],
      ["keyUsage","cRLSign,keyCertSign",true],
    ]
    ee_exts = [
      ["keyUsage","keyEncipherment,digitalSignature",true],
    ]
    @ca_cert  = issue_cert(@ca, @ca_key, 1, ca_exts, nil, nil)
    @svr_cert = issue_cert(@svr, @svr_key, 2, ee_exts, @ca_cert, @ca_key)
    @cli_cert = issue_cert(@cli, @cli_key, 3, ee_exts, @ca_cert, @ca_key)
    @server = nil
  end

  def tls13_supported?
    return false unless defined?(OpenSSL::SSL::TLS1_3_VERSION)
    ctx = OpenSSL::SSL::SSLContext.new
    ctx.min_version = ctx.max_version = OpenSSL::SSL::TLS1_3_VERSION
    true
  rescue
  end

  def readwrite_loop(ctx, ssl)
    while line = ssl.gets
      ssl.write(line)
    end
  end

  def start_server(verify_mode: OpenSSL::SSL::VERIFY_NONE, start_immediately: true,
                   ctx_proc: nil, server_proc: method(:readwrite_loop),
                   accept_proc: proc{},
                   ignore_listener_error: false, &block)
    IO.pipe {|stop_pipe_r, stop_pipe_w|
      store = OpenSSL::X509::Store.new
      store.add_cert(@ca_cert)
      store.purpose = OpenSSL::X509::PURPOSE_SSL_CLIENT
      ctx = OpenSSL::SSL::SSLContext.new
      ctx.cert_store = store
      ctx.cert = @svr_cert
      ctx.key = @svr_key
      ctx.verify_mode = verify_mode
      ctx_proc.call(ctx) if ctx_proc

      Socket.do_not_reverse_lookup = true
      tcps = TCPServer.new("127.0.0.1", 0)
      port = tcps.connect_address.ip_port

      ssls = OpenSSL::SSL::SSLServer.new(tcps, ctx)
      ssls.start_immediately = start_immediately

      threads = []
      begin
        server_thread = Thread.new do
          Thread.current.report_on_exception = false

          begin
            loop do
              begin
                readable, = IO.select([ssls, stop_pipe_r])
                break if readable.include? stop_pipe_r
                ssl = ssls.accept
                accept_proc.call(ssl)
              rescue OpenSSL::SSL::SSLError, IOError, Errno::EBADF, Errno::EINVAL,
                     Errno::ECONNABORTED, Errno::ENOTSOCK, Errno::ECONNRESET
                retry if ignore_listener_error
                raise
              end

              th = Thread.new do
                Thread.current.report_on_exception = false

                begin
                  server_proc.call(ctx, ssl)
                ensure
                  ssl.close
                end
                true
              end
              threads << th
            end
          ensure
            tcps.close
          end
        end

        client_thread = Thread.new do
          Thread.current.report_on_exception = false

          begin
            block.call(port)
          ensure
            # Stop accepting new connection
            stop_pipe_w.close
            server_thread.join
          end
        end
        threads.unshift client_thread
      ensure
        # Terminate existing connections. If a thread did 'pend', re-raise it.
        pend = nil
        threads.each { |th|
          begin
            timeout = EnvUtil.apply_timeout_scale(30)
            th.join(timeout) or
              th.raise(RuntimeError, "[start_server] thread did not exit in #{timeout} secs")
          rescue Test::Unit::PendedError
            pend = $!
          rescue Exception
          end
        }
        raise pend if pend
        assert_join_threads(threads)
      end
    }
  end
end

class OpenSSL::PKeyTestCase < OpenSSL::TestCase
  def check_component(base, test, keys)
    keys.each { |comp|
      assert_equal base.send(comp), test.send(comp)
    }
  end
end

module OpenSSL::Certs
  include OpenSSL::TestUtils

  module_function

  def ca_cert
    ca = OpenSSL::X509::Name.parse("/DC=org/DC=ruby-lang/CN=Timestamp Root CA")

    ca_exts = [
      ["basicConstraints","CA:TRUE,pathlen:1",true],
      ["keyUsage","keyCertSign, cRLSign",true],
      ["subjectKeyIdentifier","hash",false],
      ["authorityKeyIdentifier","keyid:always",false],
    ]
    OpenSSL::TestUtils.issue_cert(ca, Fixtures.pkey("rsa2048"), 1, ca_exts, nil, nil)
  end

  def ts_cert_direct(key, ca_cert)
    dn = OpenSSL::X509::Name.parse("/DC=org/DC=ruby-lang/OU=Timestamp/CN=Server Direct")

    exts = [
      ["basicConstraints","CA:FALSE",true],
      ["keyUsage","digitalSignature, nonRepudiation", true],
      ["subjectKeyIdentifier", "hash",false],
      ["authorityKeyIdentifier","keyid,issuer", false],
      ["extendedKeyUsage", "timeStamping", true]
    ]

    OpenSSL::TestUtils.issue_cert(dn, key, 2, exts, ca_cert, Fixtures.pkey("rsa2048"))
  end

  def intermediate_cert(key, ca_cert)
    dn = OpenSSL::X509::Name.parse("/DC=org/DC=ruby-lang/OU=Timestamp/CN=Timestamp Intermediate CA")

    exts = [
      ["basicConstraints","CA:TRUE,pathlen:0",true],
      ["keyUsage","keyCertSign, cRLSign",true],
      ["subjectKeyIdentifier","hash",false],
      ["authorityKeyIdentifier","keyid:always",false],
    ]

    OpenSSL::TestUtils.issue_cert(dn, key, 3, exts, ca_cert, Fixtures.pkey("rsa2048"))
  end

  def ts_cert_ee(key, intermediate, im_key)
    dn = OpenSSL::X509::Name.parse("/DC=org/DC=ruby-lang/OU=Timestamp/CN=Server End Entity")

    exts = [
      ["keyUsage","digitalSignature, nonRepudiation", true],
      ["subjectKeyIdentifier", "hash",false],
      ["authorityKeyIdentifier","keyid,issuer", false],
      ["extendedKeyUsage", "timeStamping", true]
    ]

    OpenSSL::TestUtils.issue_cert(dn, key, 4, exts, intermediate, im_key)
  end
end

end
